ECE M214A Project: Classification of Speaker Regions¶
In this project, we'll train a machine learning algorithm to classify speakers by regional dialect. We will use speech samples from the Corpus of Regional African American Language (CORAAL - https://oraal.uoregon.edu/coraal) with speakers each belonging to one of five different US cities: 1) Rochester, NY (ROC), 2) Lower East Side, Manhattan, NY (LES), 3) Washington DC (DCB), 4) Princeville, NC (PRV), or 5) Valdosta, GA (VLD).
The project files can be downloaded from this link (UCLA Account Required)
To do this, we will first extract features from the audio files and then train a classifier to predict the city of origin of the utterance's speaker. The goal is to extract a feature that contains useful information about regional dialect characteristics.
Authors: Keith Chen, Timothy Do, James Shiffer, Thomas Sison
Contact: timothydo@ucla.edu
1. Setup¶
try:
import google.colab
IN_COLAB = True
except:
IN_COLAB = False
print(f"In CoLab: {IN_COLAB}")
# Libraries
from IPython.display import Audio
import xgboost
import numpy as np
import shap
import pandas as pd
import random
from pydub import AudioSegment
import os
import librosa
import torchaudio
import numpy as np
from glob import glob
from tqdm import tqdm
import pickle
import contextlib
import joblib
import math
import time
from sklearn import metrics
import matplotlib.pyplot as plt
import opensmile
import spafe.features.gfcc
import spafe.features.rplp
import spafe.features.pncc
# Clone the Repo if in CoLab & Install Dependencies
if(IN_COLAB):
!git clone https://github.com/dotimothy/M214A-Project.git
!mv -v ./M214A-Project/* .
!rm -rf M214A-Project
!pip install -r requirements.txt
if(not(os.path.exists('./W24 ECE M214A Project.zip'))):
if(IN_COLAB):
!gdown 1abjWNLbLzwJJ2qv_V0CfpfeMzs56sSut
else:
print('Please Download The Project Data (W24 ECE M214A Project.zip) and Try Again')
if(not(os.path.exists('./W24 ECE M214A Project/'))):
if(IN_COLAB):
!unzip "W24 ECE M214A Project.zip"
else:
print('Please Unzip the Project Data (W24 ECE M214A Project.zip) to ((W24 ECE M214A Project/) and Try Again')
project_dir = './W24 ECE M214A Project'
In CoLab: False
2. Getting familiar with the data¶
Let's take a moment to understand the data. The original CORAAL dataset consists of ~85 different speakers, each from one of five cities. The audio files are names with the convention: DCB_se1_ag1_f_03. Here, DCB is the city code, se1 denotes the socioeconomic group of the speaker, ag1 denotes the age group of the speaker, f denotes female, and 03 denotes the participant number. These unique combinations of identifiers mark the speaker.
The dataset has been preprocessed to only include audio segments greater than 10 seconds in length. there are a number of audio snippets of at least 10sec in length. Those segments are numbered with the appending tag _seg_number for each segment.
You can also try listening to any segment like this:
sr = 44100
# Audio(filename= "drive/MyDrive/project_data/train_clean/DCB_se1_ag1_f_03_1_seg_3.wav", rate=sr)
Audio(filename= f"{project_dir}/project_data/train_clean/DCB_se1_ag1_f_03_1_seg_3.wav", rate=sr)
The original dataset has also been split into a train and test set. The test set has been further split, with a portion corrupted with the addition of noise:
sr = 44100
# Audio(filename= "drive/MyDrive/project_data/test_noisy/LES_se0_ag3_f_01_1_seg_57.wav", rate=sr)
Audio(filename= f"{project_dir}/project_data/test_noisy/LES_se0_ag3_f_01_1_seg_57.wav", rate=sr)
3. Baseline Feature Extraction Methods¶
As a baseline, we will be using the average mfcc value over time from the Librosa Python library. Your job will be to choose better features to improve performance on both the clean and noisy data. M214A Group 5 also introduced some multiprocessing (joblib) features to accelerate feature extraction in addition to adding options for utilizing MFCC feature's first and second derivative. Additionally, M214A group introduced a data augmentation scheme which incorporates multi-speaker chatter noise.
We first define a pair of functions to create features and labels for our classification model:
def extract_feature_mfcc(audio_file, n_mfcc=13,mfcc_1st=True,mfcc_2nd=False):
""" Function to Extract MFCC Coeffcients
n_mfcc: Number of MFCC coeffcients to extract
mfcc_1st: Flag on whether to append 1st Order MFCC gradient to feature (True/False)
mfcc_2nd: Flag on whether to append 2nd Order MFCC gradient to feature (True/False)
feat_out: Output MFCC Feature
"""
audio,fs = torchaudio.load(audio_file)
audio = audio.numpy().reshape(-1)
# mfcc
mfccs = librosa.feature.mfcc(y=audio, sr=fs, n_mfcc=n_mfcc)
feat_out = np.mean(mfccs,axis=1)
if(mfcc_1st):
mfccs_delta1 = librosa.feature.delta(mfccs,order=1)
delta1_out = np.mean(mfccs_delta1,axis=1)
feat_out = np.concatenate((feat_out,delta1_out),axis=None)
if(mfcc_2nd):
mfccs_delta2 = librosa.feature.delta(mfccs,order=2)
delta2_out = np.mean(mfccs_delta2,axis=1)
feat_out = np.concatenate((feat_out,delta2_out),axis=None)
return feat_out
def get_label(file_name):
"""
Function to retrieve output labels from filenames
file_name: Path to WAV File
label: Output label as an integer
"""
if 'ROC' in file_name:
label=0
elif 'LES' in file_name:
label=1
elif 'DC' in file_name:
label=2
elif 'PRV' in file_name:
label=3
elif 'VLD' in file_name:
label=4
else:
raise ValueError('invalid file name')
return label
test_wav = f'{project_dir}/project_data/train_clean/DCB_se1_ag1_f_01_1_seg_0.wav'
print(extract_feature_mfcc(test_wav,13,False,False).shape)
(13,)
Multiprocessing Functions are Defined Below to Utilize all the CPU Cores during Feature Extraction:
# Allows for Parallel Progress Bar with Joblib
# Source: https://stackoverflow.com/questions/24983493/tracking-progress-of-joblib-parallel-execution/58936697#58936697
@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
def __call__(self, *args, **kwargs):
tqdm_object.update(n=self.batch_size)
return super().__call__(*args, **kwargs)
old_batch_callback = joblib.parallel.BatchCompletionCallBack
joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
try:
yield tqdm_object
finally:
joblib.parallel.BatchCompletionCallBack = old_batch_callback
tqdm_object.close()
def extract_feature_list(wav_files,extract_feature,args):
""" Extract Feature of a List of WAV files
wav_files: List of WAV Files to extract Features
extract_feature: Function to extract_feature
args: Arguments to pass to extract feature (without wav path)
"""
wav_files.sort()
wav_feat = []
for wav in tqdm(wav_files):
wav_feat.append(extract_feature(wav,*args))
return wav_feat
# Function to Divide a List
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
# Overall Feature Extraction with Multiprocessing Acceleration
def extract_feature_dir(wav_files,extract_feature,args,accel=False,safe=True):
""" Extract Feature of Directory of WAV Files
wav_files: List of WAV Files to extract features
extract_feature: Function to extract_feature
args: Arguments to pass to extract feature (without wav path)
accel: Flag to enable acceleration with multiprocessing (True/False)
safe: Flag to use half or all CPU Threads (True/False)
"""
wav_files.sort()
wav_feat = []
if(accel):
num_processes = max(round(joblib.cpu_count()/2),1) if safe else joblib.cpu_count()
chunk_size = max(math.ceil(len(wav_files)/num_processes),num_processes)
wav_chunks = list(chunks(wav_files,chunk_size))
# To Implement Multi-Processing
with tqdm_joblib(tqdm(total=len(wav_chunks))) as pb:
results = joblib.Parallel(n_jobs=num_processes)(joblib.delayed(extract_feature_list)(wav_chunk,extract_feature,args) for wav_chunk in wav_chunks)
wav_feat = [x for xs in results for x in xs]
else:
wav_feat = extract_feature_list(wav_files,extract_feature,args)
return wav_feat
# Getting All Labels in a Directory
def get_label_dir(wav_files):
return [get_label(wav) for wav in tqdm(wav_files)]
# Test Parallelism
test_clean_files = glob(f'{project_dir}/project_data/test_clean/*.wav')
test_clean_files.sort()
args = (13,True,False)
start = time.time()
test_clean_feat = extract_feature_dir(test_clean_files,extract_feature_mfcc,args,False)
singleTime = time.time()-start
start = time.time()
test_clean_feat = extract_feature_dir(test_clean_files,extract_feature_mfcc,args,True)
multiTime = time.time() - start
print(f'Single-Core Extraction: {singleTime} s')
print(f'Multi-Core Extraction: {multiTime} s')
print(f'Speedup: {(singleTime/multiTime):.2f}x')
100%|████████████████████████████████████████████████████████████████████████████████| 447/447 [00:23<00:00, 18.96it/s] 100%|██████████████████████████████████████████████████████████████████████████████████| 12/12 [00:09<00:00, 1.31it/s]
Single-Core Extraction: 23.583979606628418 s Multi-Core Extraction: 9.192101001739502 s Speedup: 2.57x
Below is our Data Augmentation scheme by Adding Background Chatter Noise mixed-in with the training data. For compatiblity purposes (in-case augmenting itself doesn't work with pretrained features), preaugmented features and labels are avaliable in the repository under ./features_augmented_const, using the pre-augmented training set is avaliable at this link.
# Data Augmentation by Adding Background Chatter Noise mixed-in
# Augment Flag, Enabling it would Augment the Training Data for Feature Extraction
augment = True
usePreAugmented = True
augmentAccel = True
augmentVerbose = False
# Add random chatter mixed into background of audio clip
def add_human_chatter_random(input_audio_file, chatter_audio_files, output_folder, chatter_intensity):
""" Add random chatter mixed into background of audio clip
input_audio_file: Input Audio File
chatter_audio_files: List of chatter audio files
output_folder: Output folder for augmented data
chatter_intensity: Intensity of Chatter Noise (in dB)
"""
audio = AudioSegment.from_file(input_audio_file)
chatter_audio_file = random.choice(chatter_audio_files)
chatter = AudioSegment.from_file(chatter_audio_file)
# chatter audio length should be longer than audio clip length
if len(chatter) < len(audio):
raise ValueError("Chatter audio must be longer than input audio")
adjusted_chatter = (chatter * (len(audio) // len(chatter) + 1)) + chatter_intensity
mixed_audio = audio.overlay(adjusted_chatter[:len(audio)])
output_file = os.path.join(output_folder, os.path.basename(input_audio_file))
mixed_audio.export(output_file, format="wav")
# Multiprocessing Add Human Chatter
def add_human_chatter_list(input_audio_list,chatter_audio_files, output_folder, chatter_intensity,augmentVerbose=False):
""" Add random chatter mixed into background of list of audio clips
input_audio_list: List of Audio Files to mix with chatter noise
chatter_audio_files: List of chatter audio files
output_folder: Output folder for augmented data
chatter_intensity: Intensity of Chatter Noise (in dB)
augmentVerbose: Flag to enable error printouts (True/False)
"""
for file in tqdm(input_audio_list):
try:
add_human_chatter_random(file, chatter_audio_files, output_folder, chatter_intensity)
except ValueError as e:
if(augmentVerbose):
print(e)
def add_human_chatter_dir(input_audio_list,chatter_audio_files, output_folder, chatter_intensity, augmentVerbose=False, accel=False, safe=True):
""" Add random chatter mixed into background of list of audio clips
input_audio_list: List of Audio Files to mix with chatter noise
chatter_audio_files: List of chatter audio files
output_folder: Output folder for augmented data
chatter_intensity: Intensity of Chatter Noise (in dB)
augmentVerbose: Flag to enable error printouts (True/False)
accel: Flag to accelerate data augmentation (True/False)
safe: Flag to use half or all CPU Threads (True/False)
"""
random.seed(0) # Make it so that's it's repetable
input_audio_list.sort()
if(accel):
num_processes = max(round(joblib.cpu_count()/2),1) if safe else joblib.cpu_count()
chunk_size = max(math.ceil(len(input_audio_list)/num_processes),num_processes)
wav_chunks = list(chunks(input_audio_list,chunk_size))
# To Implement Multi-Processing
with tqdm_joblib(tqdm(total=len(wav_chunks))) as pb:
joblib.Parallel(n_jobs=num_processes)(joblib.delayed(add_human_chatter_dir)(wav_chunk,chatter_audio_files,output_folder,chatter_intensity,augmentVerbose) for wav_chunk in wav_chunks)
else:
add_human_chatter_list(input_audio_list,chatter_audio_files,output_folder,chatter_intensity,augmentVerbose)
all_train_files = glob(f'{project_dir}/project_data/train_clean/*.wav')
chatter_audio_files = glob(f'./chatter_audio_folder/*.wav')
output_folder = f'{project_dir}/project_data/augmented_data_with_adjustable_chatter'
chatter_intensity = -21 # Adjust chatter volume in dB
# Add chatter to background of training audio files
if(augment):
if not(os.path.exists(output_folder)) and not(usePreAugmented): # Augment only if augmented data doesn't exist
os.makedirs(output_folder)
add_human_chatter_dir(all_train_files,chatter_audio_files,output_folder,chatter_intensity,augmentVerbose,augmentAccel)
We will now extract labels from the train_clean, test_clean, and test_noisy directories.
# Directories Definitions and Grabbing Labels
# Based on if Augmentation is Enabled and if using Preaugmented Data
feature_dir = f'./features_augmented_data_const' if(usePreAugmented and augment) else f'./features_augmented_data' if augment else f'./features'
if(not(os.path.exists(feature_dir))):
os.mkdir(feature_dir)
train_files = glob(f'{project_dir}/project_data/train_clean/*.wav')
if(augment):
train_files += glob(f'{output_folder}/*.wav')
train_files.sort()
test_clean_files = glob(f'{project_dir}/project_data/test_clean/*.wav')
test_clean_files.sort()
test_noisy_files = glob(f'{project_dir}/project_data/test_noisy/*.wav')
test_noisy_files.sort()
# Getting Labels
train_label = []
if(usePreAugmented and os.path.exists(f'{feature_dir}/train_label.txt')):
with open(f'{feature_dir}/train_label.txt','rb') as label:
train_label = pickle.load(label)
else:
train_label = get_label_dir(train_files)
if(usePreAugmented):
with open(f'{feature_dir}/train_label.txt','wb') as label:
pickle.dump(train_label,label)
test_clean_label = get_label_dir(test_clean_files)
test_noisy_label = get_label_dir(test_noisy_files)
100%|████████████████████████████████████████████████████████████████████████████| 447/447 [00:00<00:00, 444594.23it/s] 100%|████████████████████████████████████████████████████████████████████████████| 347/347 [00:00<00:00, 347107.92it/s]
Here is a test of the baseline extraction for MFCC with the extracted directories.
# MFCC Feature Extraction
accel = True
n_mfcc = 13
mfcc_1st = True
mfcc_2nd = False
args = (n_mfcc,mfcc_1st,mfcc_2nd)
trainMFCC = f'{feature_dir}/mfcc{n_mfcc}{"_delta1" if mfcc_1st else ""}{"_delta2" if mfcc_2nd else ""}_train.csv'
testCleanMFCC = f'{feature_dir}/mfcc{n_mfcc}{"_delta1" if mfcc_1st else ""}{"_delta2" if mfcc_2nd else ""}_test_clean.csv'
testNoisyMFCC = f'{feature_dir}/mfcc{n_mfcc}{"_delta1" if mfcc_1st else ""}{"_delta2" if mfcc_2nd else ""}_test_noisy.csv'
if(not(os.path.exists(trainMFCC))):
train_feat = extract_feature_dir(train_files,extract_feature_mfcc,args,accel)
if(not(os.path.exists(testCleanMFCC))):
test_clean_feat = extract_feature_dir(test_clean_files,extract_feature_mfcc,args,accel)
if(not(os.path.exists(testNoisyMFCC))):
test_noisy_feat = extract_feature_dir(test_noisy_files,extract_feature_mfcc,args,accel)
4. Model Training and Predictions¶
Now we'll train the backend system to predict the regions from the input features. We'll use an xgboosted decision tree for this. An advantage of this model is that we can also parse the decision tree and measure the impact of different features in the end result for explainability
# MFCC Features
feat_names = ['mfcc_' + str(n) for n in range(n_mfcc)]
if(mfcc_1st):
feat_names += ['mfcc_delta1_' + str(n) for n in range(n_mfcc)]
if(mfcc_2nd):
feat_names += ['mfcc_delta2_' + str(n) for n in range(n_mfcc)]
# Prepare Training Set
if(not(os.path.exists(trainMFCC))):
train_feat_df = pd.DataFrame(data=np.stack(train_feat), columns=feat_names)
train_feat_df.to_csv(trainMFCC)
else:
train_feat_df = pd.read_csv(trainMFCC,index_col=0)
y_train = np.stack(train_label)
# Prepare Test Clean Set
if(not(os.path.exists(testCleanMFCC))):
test_clean_feat_df = pd.DataFrame(data=np.stack(test_clean_feat), columns=feat_names)
test_clean_feat_df.to_csv(testCleanMFCC,)
else:
test_clean_feat_df = pd.read_csv(testCleanMFCC,index_col=0)
y_test_clean = np.stack(test_clean_label)
# Prepare Test Noisy Set
if(not(os.path.exists(testNoisyMFCC))):
test_noisy_feat_df = pd.DataFrame(data=np.stack(test_noisy_feat), columns=feat_names)
test_noisy_feat_df.to_csv(testNoisyMFCC)
else:
test_noisy_feat_df = pd.read_csv(testNoisyMFCC,index_col=0)
y_test_noisy = np.stack(test_noisy_label)
#you could just pass in the matrix of features to xgboost
#but it looks prettier in the shap explainer if you format it
#as a dataframe.
model = xgboost.XGBClassifier()
model.fit(train_feat_df,y_train)
print("Train MFCC Clean Acc =", np.sum(y_train==model.predict(train_feat_df))/len(y_train))
print("Test MFCC Clean Acc =", np.sum(y_test_clean==model.predict(test_clean_feat_df))/len(y_test_clean))
print("Test MFCC Noisy Acc =", np.sum(y_test_noisy==model.predict(test_noisy_feat_df))/len(y_test_noisy))
Train MFCC Clean Acc = 1.0 Test MFCC Clean Acc = 0.785234899328859 Test MFCC Noisy Acc = 0.6657060518731989
5. Interpreting Results and Explainability¶
To see the impact different features have on the model, we create a plot of the feature importances. The features are listed top to bottom in order of how important they were to the decision.
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(train_feat_df)
shap.summary_plot(shap_values, train_feat_df)
And we can see a confusion matrix of the mispredictions
confusion_matrix_clean = metrics.confusion_matrix(y_test_clean, model.predict(test_clean_feat_df))
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix_clean, display_labels = ['ROC','LES','DCB','PRV','VLD'])
cm_display.plot()
plt.show()
confusion_matrix_noisy = metrics.confusion_matrix(y_test_noisy, model.predict(test_noisy_feat_df))
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix_noisy, display_labels = ['ROC','LES','DCB','PRV','VLD'])
cm_display.plot()
plt.show()
6. Opensmile¶
Feature Set GeMAPSv01b¶
smile = opensmile.Smile(
feature_set=opensmile.FeatureSet.GeMAPSv01b,
feature_level=opensmile.FeatureLevel.Functionals,
)
print(f'Opensmile Features: {smile.feature_names}')
print(f'\nTotal Number of Features: {len(smile.feature_names)}')
Opensmile Features: ['F0semitoneFrom27.5Hz_sma3nz_amean', 'F0semitoneFrom27.5Hz_sma3nz_stddevNorm', 'F0semitoneFrom27.5Hz_sma3nz_percentile20.0', 'F0semitoneFrom27.5Hz_sma3nz_percentile50.0', 'F0semitoneFrom27.5Hz_sma3nz_percentile80.0', 'F0semitoneFrom27.5Hz_sma3nz_pctlrange0-2', 'F0semitoneFrom27.5Hz_sma3nz_meanRisingSlope', 'F0semitoneFrom27.5Hz_sma3nz_stddevRisingSlope', 'F0semitoneFrom27.5Hz_sma3nz_meanFallingSlope', 'F0semitoneFrom27.5Hz_sma3nz_stddevFallingSlope', 'loudness_sma3_amean', 'loudness_sma3_stddevNorm', 'loudness_sma3_percentile20.0', 'loudness_sma3_percentile50.0', 'loudness_sma3_percentile80.0', 'loudness_sma3_pctlrange0-2', 'loudness_sma3_meanRisingSlope', 'loudness_sma3_stddevRisingSlope', 'loudness_sma3_meanFallingSlope', 'loudness_sma3_stddevFallingSlope', 'jitterLocal_sma3nz_amean', 'jitterLocal_sma3nz_stddevNorm', 'shimmerLocaldB_sma3nz_amean', 'shimmerLocaldB_sma3nz_stddevNorm', 'HNRdBACF_sma3nz_amean', 'HNRdBACF_sma3nz_stddevNorm', 'logRelF0-H1-H2_sma3nz_amean', 'logRelF0-H1-H2_sma3nz_stddevNorm', 'logRelF0-H1-A3_sma3nz_amean', 'logRelF0-H1-A3_sma3nz_stddevNorm', 'F1frequency_sma3nz_amean', 'F1frequency_sma3nz_stddevNorm', 'F1bandwidth_sma3nz_amean', 'F1bandwidth_sma3nz_stddevNorm', 'F1amplitudeLogRelF0_sma3nz_amean', 'F1amplitudeLogRelF0_sma3nz_stddevNorm', 'F2frequency_sma3nz_amean', 'F2frequency_sma3nz_stddevNorm', 'F2amplitudeLogRelF0_sma3nz_amean', 'F2amplitudeLogRelF0_sma3nz_stddevNorm', 'F3frequency_sma3nz_amean', 'F3frequency_sma3nz_stddevNorm', 'F3amplitudeLogRelF0_sma3nz_amean', 'F3amplitudeLogRelF0_sma3nz_stddevNorm', 'alphaRatioV_sma3nz_amean', 'alphaRatioV_sma3nz_stddevNorm', 'hammarbergIndexV_sma3nz_amean', 'hammarbergIndexV_sma3nz_stddevNorm', 'slopeV0-500_sma3nz_amean', 'slopeV0-500_sma3nz_stddevNorm', 'slopeV500-1500_sma3nz_amean', 'slopeV500-1500_sma3nz_stddevNorm', 'alphaRatioUV_sma3nz_amean', 'hammarbergIndexUV_sma3nz_amean', 'slopeUV0-500_sma3nz_amean', 'slopeUV500-1500_sma3nz_amean', 'loudnessPeaksPerSec', 'VoicedSegmentsPerSec', 'MeanVoicedSegmentLengthSec', 'StddevVoicedSegmentLengthSec', 'MeanUnvoicedSegmentLength', 'StddevUnvoicedSegmentLength'] Total Number of Features: 62
Feature Set eGeMAPSv02¶
smile2 = opensmile.Smile(
feature_set=opensmile.FeatureSet.eGeMAPSv02,
feature_level=opensmile.FeatureLevel.LowLevelDescriptors,
)
print(f'OpenSmile2 Features: {smile2.feature_names}')
print(f'\nTotal Number of Features: {len(smile2.feature_names)}')
OpenSmile2 Features: ['Loudness_sma3', 'alphaRatio_sma3', 'hammarbergIndex_sma3', 'slope0-500_sma3', 'slope500-1500_sma3', 'spectralFlux_sma3', 'mfcc1_sma3', 'mfcc2_sma3', 'mfcc3_sma3', 'mfcc4_sma3', 'F0semitoneFrom27.5Hz_sma3nz', 'jitterLocal_sma3nz', 'shimmerLocaldB_sma3nz', 'HNRdBACF_sma3nz', 'logRelF0-H1-H2_sma3nz', 'logRelF0-H1-A3_sma3nz', 'F1frequency_sma3nz', 'F1bandwidth_sma3nz', 'F1amplitudeLogRelF0_sma3nz', 'F2frequency_sma3nz', 'F2bandwidth_sma3nz', 'F2amplitudeLogRelF0_sma3nz', 'F3frequency_sma3nz', 'F3bandwidth_sma3nz', 'F3amplitudeLogRelF0_sma3nz'] Total Number of Features: 25
def extract_smile(wav,smileType):
""" Extracting Features with the Opensmile Library
wav: Wav sample to extract Opensmile feature
smileType: Choice of OpenSmile Feature Set (0: GeMAPSv01b, 1: eGeMAPSv02)
np.array(y.iloc[0]): Opensmile Extracted Feature
"""
audio,sample_fs = torchaudio.load(wav)
sample_audio = audio.numpy().reshape(-1)
feature_sets = [opensmile.FeatureSet.GeMAPSv01b,opensmile.FeatureSet.eGeMAPSv02]
feature_levels = [opensmile.FeatureLevel.Functionals,opensmile.FeatureLevel.LowLevelDescriptors]
smile = opensmile.Smile(
feature_set=feature_sets[smileType],
feature_level=feature_levels[smileType]
)
#opensmile.FeatureSet.ComParE_2016,
#opensmile.FeatureSet.eGeMAPSv02
y = smile.process_signal(
sample_audio,
sample_fs
)
return np.array(y.iloc[0])
test_wav = f'{project_dir}/project_data/train_clean/DCB_se1_ag1_f_01_1_seg_0.wav'
print(extract_smile(test_wav,0).shape)
print(extract_smile(test_wav,1).shape)
(62,) (25,)
# Extracting Smile Features
trainOS = f'{feature_dir}/os_train.csv'
testCleanOS = f'{feature_dir}/os_test_clean.csv'
testNoisyOS = f'{feature_dir}/os_test_noisy.csv'
args = (0,)
accel = True
if(not(os.path.exists(trainOS))):
train_feat_os = extract_feature_dir(train_files,extract_smile,args,accel)
if(not(os.path.exists(testCleanOS))):
test_clean_feat_os = extract_feature_dir(test_clean_files,extract_smile,args,accel)
if(not(os.path.exists(testNoisyOS))):
test_noisy_feat_os = extract_feature_dir(test_noisy_files,extract_smile,args,accel)
# Evaluating OS Feature
feat_names_os = list(smile.feature_names)
if(not(os.path.exists(trainOS))):
train_feat_os_df = pd.DataFrame(data=np.stack(train_feat_os), columns=feat_names_os)
train_feat_os_df.to_csv(trainOS)
else:
train_feat_os_df = pd.read_csv(trainOS,index_col=0)
if(not(os.path.exists(testCleanOS))):
test_clean_feat_os_df = pd.DataFrame(data=np.stack(test_clean_feat_os), columns=feat_names_os)
test_clean_feat_os_df.to_csv(testCleanOS)
else:
test_clean_feat_os_df = pd.read_csv(testCleanOS,index_col=0)
if(not(os.path.exists(testNoisyOS))):
test_noisy_feat_os_df = pd.DataFrame(data=np.stack(test_noisy_feat_os), columns=feat_names_os)
test_noisy_feat_os_df.to_csv(testNoisyOS)
else:
test_noisy_feat_os_df = pd.read_csv(testNoisyOS,index_col=0)
model_os = xgboost.XGBClassifier()
model_os.fit(train_feat_os_df,y_train)
print("Train Clean Opensmile Acc =", np.sum(y_train==model_os.predict(train_feat_os_df))/len(y_train))
print("Test Clean Opensmile Acc =", np.sum(y_test_clean==model_os.predict(test_clean_feat_os_df))/len(y_test_clean))
print("Test Noisy Opensmile Acc =", np.sum(y_test_noisy==model_os.predict(test_noisy_feat_os_df))/len(y_test_noisy))
print()
explainer = shap.TreeExplainer(model_os)
shap_values_os = explainer.shap_values(train_feat_os_df)
shap.summary_plot(shap_values_os, train_feat_os_df)
print()
confusion_matrix_clean_os = metrics.confusion_matrix(y_test_clean, model_os.predict(test_clean_feat_os_df))
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix_clean_os, display_labels = ['ROC','LES','DCB','PRV','VLD'])
cm_display.plot()
plt.show()
print()
confusion_matrix_noisy_os = metrics.confusion_matrix(y_test_noisy, model_os.predict(test_noisy_feat_os_df))
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix_noisy_os, display_labels = ['ROC','LES','DCB','PRV','VLD'])
cm_display.plot()
plt.show()
Train Clean Opensmile Acc = 1.0 Test Clean Opensmile Acc = 0.912751677852349 Test Noisy Opensmile Acc = 0.5389048991354467
# Extracting Smile2 Features
trainOS2 = f'{feature_dir}/os2_train.csv'
testCleanOS2 = f'{feature_dir}/os2_test_clean.csv'
testNoisyOS2 = f'{feature_dir}/os2_test_noisy.csv'
args = (1,)
accel = True
if(not(os.path.exists(trainOS2))):
train_feat_os2 = extract_feature_dir(train_files,extract_smile,args,accel)
if(not(os.path.exists(testCleanOS2))):
test_clean_feat_os2 = extract_feature_dir(test_clean_files,extract_smile,args,accel)
if(not(os.path.exists(testNoisyOS2))):
test_noisy_feat_os2 = extract_feature_dir(test_noisy_files,extract_smile,args,accel)
# Evaluating OS2 Features
feat_names_os2 = list(smile2.feature_names)
if(not(os.path.exists(trainOS2))):
train_feat_os2_df = pd.DataFrame(data=np.stack(train_feat_os2), columns=feat_names_os2)
train_feat_os2_df.to_csv(trainOS2)
else:
train_feat_os2_df = pd.read_csv(trainOS2,index_col=0)
if(not(os.path.exists(testCleanOS2))):
test_clean_feat_os2_df = pd.DataFrame(data=np.stack(test_clean_feat_os2), columns=feat_names_os2)
test_clean_feat_os2_df.to_csv(testCleanOS2)
else:
test_clean_feat_os2_df = pd.read_csv(testCleanOS2,index_col=0)
if(not(os.path.exists(testNoisyOS2))):
test_noisy_feat_os2_df = pd.DataFrame(data=np.stack(test_noisy_feat_os2), columns=feat_names_os2)
test_noisy_feat_os2_df.to_csv(testNoisyOS2)
else:
test_noisy_feat_os2_df = pd.read_csv(testNoisyOS2,index_col=0)
model_os2 = xgboost.XGBClassifier()
model_os2.fit(train_feat_os2_df,y_train)
print("Train Clean Opensmile 2 Acc =", np.sum(y_train==model_os2.predict(train_feat_os2_df))/len(y_train))
print("Test Clean Opensmile 2 Acc =", np.sum(y_test_clean==model_os2.predict(test_clean_feat_os2_df))/len(y_test_clean))
print("Test Noisy Opensmile 2 Acc =", np.sum(y_test_noisy==model_os2.predict(test_noisy_feat_os2_df))/len(y_test_noisy))
print()
explainer = shap.TreeExplainer(model_os2)
shap_values_os2 = explainer.shap_values(train_feat_os2_df)
shap.summary_plot(shap_values_os2, train_feat_os2_df)
print()
confusion_matrix_clean_os2 = metrics.confusion_matrix(y_test_clean, model_os2.predict(test_clean_feat_os2_df))
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix_clean_os2, display_labels = ['ROC','LES','DCB','PRV','VLD'])
cm_display.plot()
plt.show()
print()
confusion_matrix_noisy_os2 = metrics.confusion_matrix(y_test_noisy, model_os2.predict(test_noisy_feat_os2_df))
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix_noisy_os2, display_labels = ['ROC','LES','DCB','PRV','VLD'])
cm_display.plot()
plt.show()
Train Clean Opensmile 2 Acc = 0.9924829692271553 Test Clean Opensmile 2 Acc = 0.7024608501118568 Test Noisy Opensmile 2 Acc = 0.45821325648414984
7. Spafe (Advanced Features)¶
Gamma Frequency Cepstral Coeffcients (GFCC)¶
sr = 44100
def extract_feature_gfcc(audio_file, n_gfcc=13):
""" Extract GFCC Features
audio_file: WAV sample to extract GFCC feature
n_gfcc: Number of GFCC Coeffcients in feature
feat_out: GFCC Feature of the WAV sample
"""
audio,fs = torchaudio.load(audio_file)
audio = audio.numpy().reshape(-1)
gfccs = spafe.features.gfcc.gfcc(sig=audio, fs=sr, num_ceps=n_gfcc)
feat_out = np.mean(gfccs,axis=0)
return feat_out
test_wav = f'{project_dir}/project_data/train_clean/DCB_se1_ag1_f_01_1_seg_0.wav'
print(extract_feature_gfcc(test_wav,13))
[ 3.52699334e-02 -5.40729009e-05 -6.92793861e-03 -6.57449170e-03 -4.62176106e-03 1.73811028e-03 1.10878099e-03 3.62485582e-03 -9.84202129e-04 -1.08928857e-03 -3.15170819e-04 -8.84683817e-04 4.37231553e-04]
# GFCC Feature Extraction
accel = True
n_gfcc = 13
args = (n_gfcc,)
trainGFCC = f'{feature_dir}/gfcc{n_gfcc}_train.csv'
testCleanGFCC = f'{feature_dir}/gfcc{n_gfcc}_test_clean.csv'
testNoisyGFCC = f'{feature_dir}/gfcc{n_gfcc}_test_noisy.csv'
if(not(os.path.exists(trainGFCC))):
train_feat = extract_feature_dir(train_files,extract_feature_gfcc,args,accel)
if(not(os.path.exists(testCleanGFCC))):
test_clean_feat = extract_feature_dir(test_clean_files,extract_feature_gfcc,args,accel)
if(not(os.path.exists(testNoisyGFCC))):
test_noisy_feat = extract_feature_dir(test_noisy_files,extract_feature_gfcc,args,accel)
# GFCC Features
feat_names = ['gfcc_' + str(n) for n in range(n_gfcc)]
# Prepare Training Set
if(not(os.path.exists(trainGFCC))):
train_feat_df = pd.DataFrame(data=np.stack(train_feat), columns=feat_names)
train_feat_df.to_csv(trainGFCC)
else:
train_feat_df = pd.read_csv(trainGFCC,index_col=0)
y_train = np.stack(train_label)
# Prepare Test Clean Set
if(not(os.path.exists(testCleanGFCC))):
test_clean_feat_df = pd.DataFrame(data=np.stack(test_clean_feat), columns=feat_names)
test_clean_feat_df.to_csv(testCleanGFCC,)
else:
test_clean_feat_df = pd.read_csv(testCleanGFCC,index_col=0)
y_test_clean = np.stack(test_clean_label)
# Prepare Test Noisy Set
if(not(os.path.exists(testNoisyGFCC))):
test_noisy_feat_df = pd.DataFrame(data=np.stack(test_noisy_feat), columns=feat_names)
test_noisy_feat_df.to_csv(testNoisyGFCC)
else:
test_noisy_feat_df = pd.read_csv(testNoisyGFCC,index_col=0)
y_test_noisy = np.stack(test_noisy_label)
model = xgboost.XGBClassifier()
model.fit(train_feat_df,y_train)
print("Train GFCC Clean Acc =", np.sum(y_train==model.predict(train_feat_df))/len(y_train))
print("Test GFCC Clean Acc =", np.sum(y_test_clean==model.predict(test_clean_feat_df))/len(y_test_clean))
print("Test GFCC Noisy Acc =", np.sum(y_test_noisy==model.predict(test_noisy_feat_df))/len(y_test_noisy))
print()
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(train_feat_df)
shap.summary_plot(shap_values, train_feat_df)
print()
confusion_matrix_clean = metrics.confusion_matrix(y_test_clean, model.predict(test_clean_feat_df))
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix_clean, display_labels = ['ROC','LES','DCB','PRV','VLD'])
cm_display.plot()
plt.show()
print()
confusion_matrix_noisy= metrics.confusion_matrix(y_test_noisy, model.predict(test_noisy_feat_df))
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix_noisy, display_labels = ['ROC','LES','DCB','PRV','VLD'])
cm_display.plot()
plt.show()
Train GFCC Clean Acc = 1.0 Test GFCC Clean Acc = 0.6890380313199105 Test GFCC Noisy Acc = 0.8155619596541787
Perceptual Linear Prediction Coeffcients (PLP)¶
sr = 44100
def extract_feature_plp(audio_file, order=13):
""" Extract PLP Features
audio_file: WAV sample to extract PLP feature
order: PLP Prediction Order
feat_out: PLP Feature of the WAV sample
"""
audio,fs = torchaudio.load(audio_file)
audio = audio.numpy().reshape(-1)
gfccs = spafe.features.rplp.plp(sig=audio, fs=sr, order=order)
feat_out = np.mean(gfccs,axis=0)
return feat_out
test_wav = f'{project_dir}/project_data/train_clean/DCB_se1_ag1_f_01_1_seg_0.wav'
print(extract_feature_plp(test_wav,13))
[-8.89161729e+01 -1.00325089e+00 1.09327901e-03 1.12161967e-03 1.12665492e-03 1.11661652e-03 1.09560193e-03 1.06600807e-03 1.02937231e-03 9.86705184e-04 9.38589743e-04 8.85097211e-04 3.38410298e-03]
# PLP Feature Extraction
accel = True
n_plp = 24
args = (n_plp,)
trainPLP = f'{feature_dir}/plp{n_plp}_train.csv'
testCleanPLP = f'{feature_dir}/plp{n_plp}_test_clean.csv'
testNoisyPLP = f'{feature_dir}/plp{n_plp}_test_noisy.csv'
if(not(os.path.exists(trainPLP))):
train_feat = extract_feature_dir(train_files,extract_feature_plp,args,accel)
if(not(os.path.exists(testCleanPLP))):
test_clean_feat = extract_feature_dir(test_clean_files,extract_feature_plp,args,accel)
if(not(os.path.exists(testNoisyPLP))):
test_noisy_feat = extract_feature_dir(test_noisy_files,extract_feature_plp,args,accel)
# PLP Features
feat_names = ['plp_' + str(n) for n in range(n_plp)]
# Prepare Training Set
if(not(os.path.exists(trainPLP))):
train_feat_df = pd.DataFrame(data=np.stack(train_feat), columns=feat_names)
train_feat_df.to_csv(trainPLP)
else:
train_feat_df = pd.read_csv(trainPLP,index_col=0)
y_train = np.stack(train_label)
# Prepare Test Clean Set
if(not(os.path.exists(testCleanPLP))):
test_clean_feat_df = pd.DataFrame(data=np.stack(test_clean_feat), columns=feat_names)
test_clean_feat_df.to_csv(testCleanPLP,)
else:
test_clean_feat_df = pd.read_csv(testCleanPLP,index_col=0)
y_test_clean = np.stack(test_clean_label)
# Prepare Test Noisy Set
if(not(os.path.exists(testNoisyPLP))):
test_noisy_feat_df = pd.DataFrame(data=np.stack(test_noisy_feat), columns=feat_names)
test_noisy_feat_df.to_csv(testNoisyPLP)
else:
test_noisy_feat_df = pd.read_csv(testNoisyPLP,index_col=0)
y_test_noisy = np.stack(test_noisy_label)
model = xgboost.XGBClassifier()
model.fit(train_feat_df,y_train)
print("Train PLP Clean Acc =", np.sum(y_train==model.predict(train_feat_df))/len(y_train))
print("Test PLP Clean Acc =", np.sum(y_test_clean==model.predict(test_clean_feat_df))/len(y_test_clean))
print("Test PLP Noisy Acc =", np.sum(y_test_noisy==model.predict(test_noisy_feat_df))/len(y_test_noisy))
print()
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(train_feat_df)
shap.summary_plot(shap_values, train_feat_df)
print()
confusion_matrix_clean = metrics.confusion_matrix(y_test_clean, model.predict(test_clean_feat_df))
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix_clean, display_labels = ['ROC','LES','DCB','PRV','VLD'])
cm_display.plot()
plt.show()
print()
confusion_matrix_noisy= metrics.confusion_matrix(y_test_noisy, model.predict(test_noisy_feat_df))
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix_noisy, display_labels = ['ROC','LES','DCB','PRV','VLD'])
cm_display.plot()
plt.show()
Train PLP Clean Acc = 0.9931876908621095 Test PLP Clean Acc = 0.680089485458613 Test PLP Noisy Acc = 0.6138328530259366
Power Normalized Cepstral Coeffcients (PNCC)¶
sr = 44100
def extract_feature_pncc(audio_file, n_pncc=13):
""" Extract PNCC Features
audio_file: WAV sample to extract PNCC feature
n_pncc: Number of PNCC Coeffcients
feat_out: PNCC Feature of the WAV sample
"""
audio,fs = torchaudio.load(audio_file)
audio = audio.numpy().reshape(-1)
pnccs = spafe.features.pncc.pncc(sig=audio, fs=sr, num_ceps=n_pncc)
feat_out = np.mean(pnccs,axis=0)
return feat_out
test_wav = f'{project_dir}/project_data/train_clean/DCB_se1_ag1_f_01_1_seg_0.wav'
print(extract_feature_pncc(test_wav,13))
[ 3.04516268e+00 -3.94841739e-02 -1.46968677e-01 -8.90194553e-02 -9.12554395e-02 -1.67210619e-04 -4.02488665e-03 4.84571815e-02 -1.51813209e-02 -1.52985058e-02 -1.63963675e-03 -1.83922195e-02 4.15217359e-05]
# Feature Extraction
accel = True
n_pncc = 21
args = (n_pncc,)
trainPNCC = f'{feature_dir}/pncc{n_pncc}_train.csv'
testCleanPNCC = f'{feature_dir}/pncc{n_pncc}_test_clean.csv'
testNoisyPNCC = f'{feature_dir}/pncc{n_pncc}_test_noisy.csv'
if(not(os.path.exists(trainPNCC))):
train_feat = extract_feature_dir(train_files,extract_feature_pncc,args,accel)
if(not(os.path.exists(testCleanPNCC))):
test_clean_feat = extract_feature_dir(test_clean_files,extract_feature_pncc,args,accel)
if(not(os.path.exists(testNoisyPNCC))):
test_noisy_feat = extract_feature_dir(test_noisy_files,extract_feature_pncc,args,accel)
# PNCC Features
feat_names = ['pncc_' + str(n) for n in range(n_pncc)]
# Prepare Training Set
if(not(os.path.exists(trainPNCC))):
train_feat_df = pd.DataFrame(data=np.stack(train_feat), columns=feat_names)
train_feat_df.to_csv(trainPNCC)
else:
train_feat_df = pd.read_csv(trainPNCC,index_col=0)
y_train = np.stack(train_label)
# Prepare Test Clean Set
if(not(os.path.exists(testCleanPNCC))):
test_clean_feat_df = pd.DataFrame(data=np.stack(test_clean_feat), columns=feat_names)
test_clean_feat_df.to_csv(testCleanPNCC)
else:
test_clean_feat_df = pd.read_csv(testCleanPNCC,index_col=0)
y_test_clean = np.stack(test_clean_label)
# Prepare Test Noisy Set
if(not(os.path.exists(testNoisyPNCC))):
test_noisy_feat_df = pd.DataFrame(data=np.stack(test_noisy_feat), columns=feat_names)
test_noisy_feat_df.to_csv(testNoisyPNCC)
else:
test_noisy_feat_df = pd.read_csv(testNoisyPNCC,index_col=0)
y_test_noisy = np.stack(test_noisy_label)
model = xgboost.XGBClassifier()
model.fit(train_feat_df,y_train)
print("Train PNCC Clean Acc =", np.sum(y_train==model.predict(train_feat_df))/len(y_train))
print("Test PNCC Clean Acc =", np.sum(y_test_clean==model.predict(test_clean_feat_df))/len(y_test_clean))
print("Test PNCC Noisy Acc =", np.sum(y_test_noisy==model.predict(test_noisy_feat_df))/len(y_test_noisy))
print()
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(train_feat_df)
shap.summary_plot(shap_values, train_feat_df)
print()
confusion_matrix_clean = metrics.confusion_matrix(y_test_clean, model.predict(test_clean_feat_df))
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix_clean, display_labels = ['ROC','LES','DCB','PRV','VLD'])
cm_display.plot()
plt.show()
print()
confusion_matrix_noisy= metrics.confusion_matrix(y_test_noisy, model.predict(test_noisy_feat_df))
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix_noisy, display_labels = ['ROC','LES','DCB','PRV','VLD'])
cm_display.plot()
plt.show()
Train PNCC Clean Acc = 1.0 Test PNCC Clean Acc = 0.7114093959731543 Test PNCC Noisy Acc = 0.6138328530259366
8. Feature Ensembling¶
# Made it so one can ensemble more than two feature sets, add what features you want (prefix of CSV to list to ensemble)
features = ["os","gfcc21"]
train_dfs = []
test_clean_dfs = []
test_noisy_dfs = []
for feature in features:
train_dfs.append(pd.read_csv(f'{feature_dir}/{feature}_train.csv', index_col=0))
test_clean_dfs.append(pd.read_csv(f'{feature_dir}/{feature}_test_clean.csv', index_col=0))
test_noisy_dfs.append(pd.read_csv(f'{feature_dir}/{feature}_test_noisy.csv', index_col=0))
train_feat_df = pd.concat(train_dfs, axis=1)
test_clean_feat_df = pd.concat(test_clean_dfs, axis=1)
test_noisy_feat_df = pd.concat(test_noisy_dfs, axis=1)
# Get Feature Names for Columns
feat_names = train_feat_df.columns
y_train = np.stack(train_label)
y_test_clean = np.stack(test_clean_label)
y_test_noisy = np.stack(test_noisy_label)
model = xgboost.XGBClassifier()
model.fit(train_feat_df,y_train)
print("Train Fusion Clean Acc =", np.sum(y_train==model.predict(train_feat_df))/len(y_train))
print("Test Fusion Clean Acc =", np.sum(y_test_clean==model.predict(test_clean_feat_df))/len(y_test_clean))
print("Test Fusion Noisy Acc =", np.sum(y_test_noisy==model.predict(test_noisy_feat_df))/len(y_test_noisy))
print()
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(train_feat_df)
shap.summary_plot(shap_values, train_feat_df)
print()
confusion_matrix_clean = metrics.confusion_matrix(y_test_clean, model.predict(test_clean_feat_df))
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix_clean, display_labels = ['ROC','LES','DCB','PRV','VLD'])
cm_display.plot()
plt.show()
print()
confusion_matrix_noisy= metrics.confusion_matrix(y_test_noisy, model.predict(test_noisy_feat_df))
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix_noisy, display_labels = ['ROC','LES','DCB','PRV','VLD'])
cm_display.plot()
plt.show()
Train Fusion Clean Acc = 1.0 Test Fusion Clean Acc = 0.9507829977628636 Test Fusion Noisy Acc = 0.8184438040345822
9. Blind Set Test¶
This is the pipeline to extract and test the features of the blind clean/noisy test sets for the TA. The best model is the one with feature ensembling in section 8. Before running, please make sure to run all previous sections before continuing (dependencies on getting training labels, OpenSmile objects).
# Replace with Actual Directories to the Blind Sets
test_blind_clean_files = glob(f'{project_dir}/project_data/test_clean/*.wav')
test_blind_clean_label = get_label_dir(test_blind_clean_files)
test_blind_noisy_files = glob(f'{project_dir}/project_data/test_noisy/*.wav')
test_blind_noisy_label = get_label_dir(test_blind_noisy_files)
# Best off best model to what was chosen in
accel = True # Multi-Processing Flag
feature_fns = [extract_smile,extract_feature_gfcc]
feat_names = [list(smile.feature_names),['gfcc_' + str(n) for n in range(21)]]
args = [(0,),(21,)]
# Feature Extraction
for i in range(len(features)):
if(not(os.path.exists(f'{feature_dir}/{features[i]}_test_blind_clean.csv'))):
test_blind_clean_feat = extract_feature_dir(test_blind_clean_files,feature_fns[i],args[i],accel)
test_blind_clean_df = pd.DataFrame(data=np.stack(test_blind_clean_feat), columns=feat_names[i])
test_blind_clean_df.to_csv(f'{feature_dir}/{features[i]}_test_blind_clean.csv')
if(not(os.path.exists(f'{feature_dir}/{features[i]}_test_blind_noisy.csv'))):
test_blind_noisy_feat = extract_feature_dir(test_blind_noisy_files,feature_fns[i],args[i],accel)
test_blind_noisy_df = pd.DataFrame(data=np.stack(test_blind_noisy_feat), columns=feat_names[i])
test_blind_noisy_df.to_csv(f'{feature_dir}/{features[i]}_test_blind_noisy.csv')
# Loading in Features and Testing Against Model
test_blind_clean_dfs = []
test_blind_noisy_dfs = []
for feature in features:
test_blind_clean_dfs.append(pd.read_csv(f'{feature_dir}/{feature}_test_blind_clean.csv', index_col=0))
test_blind_noisy_dfs.append(pd.read_csv(f'{feature_dir}/{feature}_test_blind_noisy.csv', index_col=0))
# All Combined Features
test_blind_clean_df = pd.concat(test_blind_clean_dfs, axis=1)
test_blind_noisy_df = pd.concat(test_blind_noisy_dfs, axis=1)
# Ground Truth Labels
y_blind_test_clean = np.stack(test_clean_label)
y_blind_test_noisy = np.stack(test_noisy_label)
print("\nBlind Test Fusion Clean Acc =", np.sum(y_blind_test_clean==model.predict(test_blind_clean_df))/len(y_test_clean))
print("Blind Test Fusion Noisy Acc =", np.sum(y_blind_test_noisy==model.predict(test_blind_noisy_df))/len(y_test_noisy))
print()
confusion_matrix_clean = metrics.confusion_matrix(y_blind_test_clean, model.predict(test_blind_clean_df))
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix_clean, display_labels = ['ROC','LES','DCB','PRV','VLD'])
cm_display.plot()
plt.show()
print()
confusion_matrix_noisy= metrics.confusion_matrix(y_blind_test_noisy, model.predict(test_blind_noisy_df))
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix_noisy, display_labels = ['ROC','LES','DCB','PRV','VLD'])
cm_display.plot()
plt.show()
100%|████████████████████████████████████████████████████████████████████████████████████████| 447/447 [00:00<?, ?it/s] 100%|████████████████████████████████████████████████████████████████████████████████████████| 347/347 [00:00<?, ?it/s]
Blind Test Fusion Clean Acc = 0.9507829977628636 Blind Test Fusion Noisy Acc = 0.8184438040345822